rxjava zip操作符顺序 rxjava merge和zip的区别

您所在的位置:网站首页 rxjava zip操作符 rxjava zip操作符顺序 rxjava merge和zip的区别

rxjava zip操作符顺序 rxjava merge和zip的区别

2024-06-09 08:11| 来源: 网络整理| 查看: 265

3.6 Zip

在上一节中说到了Map与FlatMap操作符,Map是将处理事件的类型进行转换,而FlatMap是将一个事件转换成多个事件进行处理。既然有将一个事件转换成多个事件进行处理,那也会有将多个事件转换成一个事件进行处理,这个操作符就是Zip。

还是老规矩,使用图片加深对概念的理解。

rxjava zip操作符顺序 rxjava merge和zip的区别_rxjava zip操作符顺序

可以很直观的看到,使用Zip操作符可以将两个事件合成一个事件。

当然,在合并的过程中,我们有一些规则需要了解一下: 1.组合的过程是分别从两根水管里各取出一个事件来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利来进行的, 也就是说不会出现圆形1事件和三角形B事件进行合并, 也不可能出现圆形2和三角形A进行合并的情况。 2.最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了。

然后是使用Zip操作符的代码:

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { Log.i(TAG, "subscribe: 1"); emitter.onNext(1); Log.i(TAG, "subscribe: 2"); emitter.onNext(2); Log.i(TAG, "subscribe: 3"); emitter.onNext(3); Log.i(TAG, "subscribe: 4"); emitter.onNext(4); emitter.onComplete(); } }); Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { Log.i(TAG, "subscribe: One"); emitter.onNext("One"); Log.i(TAG, "subscribe: Two"); emitter.onNext("Two"); Log.i(TAG, "subscribe: Three"); emitter.onNext("Three"); Log.i(TAG, "subscribe: Four"); emitter.onNext("Four"); emitter.onComplete(); } }); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + " " + s; } }).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } });

然后我们看一下打印的Log

8468-8468 I/RxJavaTestHaHa: onSubscribe: 8468-8468 I/RxJavaTestHaHa: subscribe: 1 8468-8468 I/RxJavaTestHaHa: subscribe: 2 8468-8468 I/RxJavaTestHaHa: subscribe: 3 8468-8468 I/RxJavaTestHaHa: subscribe: 4 8468-8468 I/RxJavaTestHaHa: subscribe: One 8468-8468 I/RxJavaTestHaHa: onNext: 1 One 8468-8468 I/RxJavaTestHaHa: subscribe: Two 8468-8468 I/RxJavaTestHaHa: onNext: 2 Two 8468-8468 I/RxJavaTestHaHa: subscribe: Three 8468-8468 I/RxJavaTestHaHa: onNext: 3 Three 8468-8468 I/RxJavaTestHaHa: subscribe: Four 8468-8468 I/RxJavaTestHaHa: onNext: 4 Four 8468-8468 I/RxJavaTestHaHa: onComplete:

看样子好像是先运行observable1,然后再和observable2中的字符串组合打印出结果。为什么会这样,因为它们是运行在主线程的,这样就会有先后顺序。

如果让它们运行在不同的线程,会变成什么样子,我们可以试一下。

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { Log.i(TAG, "subscribe: 1"); emitter.onNext(1); Log.i(TAG, "subscribe: 2"); emitter.onNext(2); Log.i(TAG, "subscribe: 3"); emitter.onNext(3); Log.i(TAG, "subscribe: 4"); emitter.onNext(4); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { Log.i(TAG, "subscribe: One"); emitter.onNext("One"); Log.i(TAG, "subscribe: Two"); emitter.onNext("Two"); Log.i(TAG, "subscribe: Three"); emitter.onNext("Three"); Log.i(TAG, "subscribe: Four"); emitter.onNext("Four"); emitter.onComplete(); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + " " + s; } }).subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: " + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } });

然后看一下打印出的Log。

10048-10048 I/RxJavaTestHaHa: onSubscribe: 10048-10081 I/RxJavaTestHaHa: subscribe: 1 10048-10081 I/RxJavaTestHaHa: subscribe: 2 10048-10081 I/RxJavaTestHaHa: subscribe: 3 10048-10082 I/RxJavaTestHaHa: subscribe: One 10048-10081 I/RxJavaTestHaHa: subscribe: 4 10048-10082 I/RxJavaTestHaHa: subscribe: Two 10048-10082 I/RxJavaTestHaHa: subscribe: Three 10048-10081 I/RxJavaTestHaHa: onNext: 1 One 10048-10082 I/RxJavaTestHaHa: subscribe: Four 10048-10081 I/RxJavaTestHaHa: onNext: 2 Two 10048-10081 I/RxJavaTestHaHa: onNext: 3 Three 10048-10082 I/RxJavaTestHaHa: onNext: 4 Four 10048-10082 I/RxJavaTestHaHa: onComplete:

虽然不是数字和英文字母交替然后打印,但是也可以看出与全都运行在主线程的区别。

在这里的话,RxJava操作符的相关知识就差不多了,接下来是一些有趣的东西。

3.7 水缸

在上面的Zip操作符使用,我们使用了A,B两个管道。如果A发送事件速度快,发送了100个,而B发送速度慢,只发送了1个。在这种情况下,A的事件会存放在哪里,不会直接丢弃,而是要找一个地方存储起来,这个地方就是水缸。

rxjava zip操作符顺序 rxjava merge和zip的区别_i++_02

从图中可以看出,蓝色的区域就是水缸。

我们使用代码模拟一下这个过程。

Observable observable1 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()); Observable observable2 = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("One"); } }).subscribeOn(Schedulers.io()); Observable.zip(observable1, observable2, new BiFunction() { @Override public String apply(Integer integer, String s) throws Exception { return integer + " " + s; } }).subscribe(new Consumer() { @Override public void accept(String s) throws Exception { Log.i(TAG, "accept: "+s); } });

然后看一下内存占用图,果然,0.5GB的内存占用,果然是炸了,过了一会App也自动退了。

rxjava zip操作符顺序 rxjava merge和zip的区别_rxjava zip操作符顺序_03

我们找一下事件的起因,先看一下下面的代码。

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); } });

这个代码和上面的代码本质上是相同的,都是发送无限的数据。

但是内存占有图却出人意料,相当平静。

rxjava zip操作符顺序 rxjava merge和zip的区别_i++_04

原因很简单,因为上下游运行在同一线程,这时是一个同步的订阅关系。下游处理完一个事件后上游才会继续发送。

下面的代码更能解释这种关系。

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.i(TAG, "accept: " + integer); } });

内存占用图依然很平静。

rxjava zip操作符顺序 rxjava merge和zip的区别_i++_05

如果是在不同线程,结果会变成怎样,我们来看一下。

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.i(TAG, "accept: " + integer); } });

结果图是下面这样,内存又爆了。

rxjava zip操作符顺序 rxjava merge和zip的区别_i++_06

这是因为当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收, 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 这个时候就需要我们刚才说的水缸 ! 上游把事件发送到水缸里去, 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就OOM了.

同步与异步的情况可以用以下两张图片表示:

同步:

rxjava zip操作符顺序 rxjava merge和zip的区别_ide_07

异步:

rxjava zip操作符顺序 rxjava merge和zip的区别_rxjava zip操作符顺序_08

从图中可以看到,异步与同步最大的区别就是因为异步线程间无法通信的原因,多了一个缓存数据的水缸。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3